共计 20661 个字符,预计需要花费 52 分钟才能阅读完成。
引入
1. 创建线程的开销远远小于创建进程
创建线程的开销要远远小于创建一个进程, 因为创建进程需要操作系统申请一块内存空间, 然后将数据从硬盘读到该进程的内存空间中, 并且一个进程至少要有一个线程, 而创建一个线程只是在进程的内存空间里创建, 无需申请空间, 几乎是发起系统调用的同时一个线程就启动起来了
2. 进程之间、线程之间的关系
- 进程之间是竞争关系 : 不同的应用程序开启的不同进程共同争夺计算机硬件资源(cpu、网卡(网速) 等)
- 线程之间是协作关系 : 同一个进程内的线程都是协作关系, 一个线程将数据处理交给另一个线程处理, 不会各自干个的
3. 为何要用多线程
- 多线程必须是共用同一个进程的内存空间
- 线程比进程更轻量, 更容易创建和撤销, 创建几乎是线程的 10-100 倍, 大量线程需要动态和快速修改时, 这一特性非常重要
- 如果程序是 I/O 密集型, 那么大量的线程彼此重叠运行, 将会提升程序运行的效率
- 在多核操作系统中, 为了最大限度的利用多核资源, 可以开启多个线程, 这比开进程的开销要小的多(Python 中不适用: 因为 CPython 的原因, 存在 GIL, 同一时刻统一进程只有一个线程被执行, 后面细说)
4. 线程使用原理示例
我们使用的 Pycharm 编辑器, 一边敲代码, 一边编辑器自动帮你保存
- 场景一 : 我们使用 多进程 来做这件事, 进程与进程之间内存空间相互隔离, 如果我们要接收用户输入的代码, 然后将其保存到硬盘, 就需要申请内存空间来开设进程资源, 再创建一个管道, 将用户输入的数据放进去, 专门干保存数据的进程再从管道里拿数据
- 场景二 : 使用 多线程 来做这件事, 多线程之间共享同一个进程的内存空间, 数据的交互就变得十分方便, 并且线程创建所需资源远远小于进程
一.threading 模块介绍
前面我们所学的多进程管理模块 multiprocessing, 它其实是完全模仿了threading 模块的接口, 两者的使用非常相似, 下面一一展开介绍
二. 开启线程的两种方式
1. 方式一
current_thread().name
: 查看线程名字, 默认 Thread-1
from threading import Thread,current_thread
import os
n = 100
def task(name):
global n
n = 22
print(f" 子线程 : {name} {os.getpid()} {current_thread().name} {n}")
if __name__ == '__main__':
p = Thread(target=task,args=(" 派大星 ",))
p.start()
p.join()
print(f" 主线程 : {os.getpid()} {current_thread().name} {n}")
''' 输出
子线程 : 派大星 120 Thread-1 22
主线程 : 120 MainThread 22
'''
- 方式二 : 自定义类
from threading import Thread,current_thread
import os
n = 100
class Mythread(Thread):
def __init__(self,name):
super().__init__()
self.name = name
def run(self) -> None:
global n
n = 22
print(f" 子线程 : {self.name} {os.getpid()} {current_thread().name} {n}")
if __name__ == '__main__':
p = Mythread(" 海绵宝宝 ")
p.start()
p.join()
print(f" 主线程 : {os.getpid()} {current_thread().name} {n}")
''' 输出
子线程 : 海绵宝宝 14664 海绵宝宝 22
主线程 : 14664 MainThread 22
'''
三. 对比一个进程下开多个进程和多个线程的区别
1. 开多个进程与线程时间上的对比
🍑多进程
from multiprocessing import Process
import os,time
def task():
print(" 多进程 ")
if __name__ == '__main__':
pc_li = []
start_time = time.time()
for i in range(10):
p = Process(target=task)
p.start()
pc_li.append(p)
for i in pc_li:
i.join()
print(" 用时:%s"%(time.time()-start_time)) # 用时:1.005824089050293
print(f"{os.getpid()}end")
🍑多线程
from threading import Thread
import os,time
def task():
print(" 多线程 ")
if __name__ == '__main__':
pc_li = []
start_time = time.time()
for i in range(10):
p = Thread(target=task)
p.start()
pc_li.append(p)
for i in pc_li:
i.join()
print(" 用时:%s"%(time.time()-start_time)) # 用时:0.0019948482513427734
print(f"{os.getpid()}end")
🔰 10 个进程用时 1 秒多, 10 个线程用时 0.002 秒
2. 查看多进程和多线程的 PID
🍑多进程
from multiprocessing import Process
import os,time
def task():
print(f" 子进程 PID:{os.getpid()}")
if __name__ == '__main__':
pc_li = []
start_time = time.time()
for i in range(3):
p = Process(target=task)
p.start()
pc_li.append(p)
for i in pc_li:
i.join()
print(f" 主进程 PID:{os.getpid()}")
''' 输出
子进程 PID:14064
子进程 PID:16424
子进程 PID:9904
主进程 PID:15184
'''
🍑多线程
from threading import Thread
import os,time
def task():
print(f" 子线程 PID:{os.getpid()}")
if __name__ == '__main__':
pc_li = []
start_time = time.time()
for i in range(3):
p = Thread(target=task)
p.start()
pc_li.append(p)
for i in pc_li:
i.join()
print(f" 主线程 PID:{os.getpid()}") # 主线程也可以说是主进程
''' 输出
子线程 PID:832
子线程 PID:832
子线程 PID:832
主线程 PID:832
'''
🔰 主进程与其下开启的子进程 "PID" 都各不相同, 而主线程与其下开启的子线程 "PID" 都相同
3. 验证同一进程下的多线程是否共享该进程的数据
from threading import Thread
import time
x = 22
def task():
global x
x = 100 # 将数据改成 100
print(f" 子线程 1 打印:{x}")
def task2():
time.sleep(0.1) # 保证进程 2 是在进程 1 之后取 x
print(f" 子线程 2 打印:{x}")
if __name__ == '__main__':
p = Thread(target=task)
p2 = Thread(target=task2)
p.start()
p2.start()
p.join()
p2.join()
print(f" 主进程打印:{x}")
''' 输出
子线程 1 打印:100
子线程 2 打印:100
主进程打印:100
'''
🔰证明多线程是共享同一个进程内存空间数据的
四. 使用多线程并发 socket 通信(tcp)
示例比较简单, 更多功能自行添加
1. 服务端
from threading import Thread
from socket import *
s = socket(AF_INET,SOCK_STREAM)
s.bind(("127.0.0.1",8090))
s.listen(5)
def connection(conn):
while 1:
try:
date = conn.recv(1024)
print(date.decode("utf-8"))
conn.send(" 阿巴阿巴 ".encode("utf-8"))
except Exception:
break
if __name__ == '__main__':
while 1:
conn,addr = s.accept()
p = Thread(target=connection,args=(conn,))
p.start()
2. 客户端 (可以多台客户端)
from socket import *
c = socket(AF_INET,SOCK_STREAM)
c.connect(("127.0.0.1",8090))
while True:
inp = input(">>").strip()
if len(inp) == 0:continue
c.send(inp.encode("utf-8"))
date = c.recv(1024)
print(date.decode("utf-8"))
五. 使用多线程实现类似编辑器的功能
需求 : 1. 接收用户输入, 2. 将用户输入进行 upper 操作, 3. 将大写的数据保存到文件
from threading import Thread
inp_li = []
upp_li = []
def inp(): # 接收用户输入
while 1:
date = input(" 用户输入 >>").strip()
if len(date) == 0:continue
inp_li.append(date)
def upp(): # 将输入变成大写
while 1:
if inp_li:
date = inp_li.pop()
upp_li.append(date.upper())
def file(): # 将大写内容存入文件
while 1:
if upp_li:
with open("a.txt","a",encoding="utf-8")as f:
f.write(f"{upp_li.pop()}\n")
if __name__ == '__main__':
i = Thread(target=inp)
u = Thread(target=upp)
f = Thread(target=file)
i.start()
u.start()
f.start()
六. 线程对象的 join 方法
同进程 join 方法一样, 让主线程等待子线程运行完毕后再运行
from threading import Thread,current_thread
import time
def task():
time.sleep(2)
print(f" 子线程 {current_thread().name} 运行完毕 ")
if __name__ == '__main__':
p = Thread(target=task)
p.start()
p.join()
print(" 主线程 --->")
''' 输出
子线程 Thread- 1 运行完毕
主线程 --->
'''
七. 线程相关的其他方法
1.Thread 对象的方法
方法 | 作用 |
---|---|
p.is_alive() | 返回线程是否存活 |
p.getName() | 返回线程名 |
p.setName() | 修改线程名 |
2.threading 模块的方法
方法 | 作用 |
---|---|
threading.currentThread() | 返回当前的线程变量 |
threading.enumerate() | 返回一个包含正在运行线程的列表, 也就是存活的线程 |
threading.activeCount() | 返回正在运行的线程数量, 等价于:len(threading.enumerate()) |
3. 以上方法演示
from threading import Thread,current_thread
import threading
import time
def task():
print(f" 子线程打印:{current_thread().getName()}")
time.sleep(10)
if __name__ == '__main__':
p = Thread(target=task)
p.start()
time.sleep(0.1)
print(p.is_alive()) # True
print(p.getName()) # Thread-1
p.setName(" 派大星 ")
print(p.getName()) # 派大星
p2 = Thread(target=task)
p3 = Thread(target=task)
p2.start()
p3.start()
print(threading.current_thread().name) # 获取主线程名字
print(threading.enumerate()) # 包含主线程在内的所有正在运行的进程
print(threading.active_count()) # 线程对象列表
''' 输出
子线程打印:Thread-1
True
Thread-1
派大星
子线程打印:Thread-2
子线程打印:Thread-3
MainThread
[<_MainThread(MainThread, started 3800)>, <Thread(派大星, started 2704)>, <Thread(Thread-2, started 18388)>, <Thread(Thread-3, started 8448)>]
4
'''
4. 验证主线程等待子线程运行完毕
from threading import Thread,current_thread
import time
def task():
time.sleep(2)
print(f" 子线程 {current_thread().name} 运行完毕 ")
if __name__ == '__main__':
p = Thread(target=task)
p.start()
print(" 主线程 --->")
print(p.is_alive())
''' 输出
主线程 --->
True
子线程 Thread- 1 运行完毕
'''
八. 守护线程
1. 进程守护进程与线程守护进程
- 对于 主进程 来讲, 守护进程守护的是主进程的代码, 主进程代码运行完毕, 则守护进程就终止, 之后如果还有非守护子进程在运行, 那么主进程会一直等待其运行完毕后回收该子进程的资源, 不然就会产生僵尸进程
- 对于 主线程 来讲, 守护线程守护的是主线程的整个生命周期, 主线程需要等待其他非守护线程运行完毕才算完毕, 完毕的同时守护线程也被回收, 主线程的结束也就意味着进程的结束, 之后进程整体的资源将被回收, 而进程必须保证非守护线程都运行完毕后才能结束
2. 守护线程
from threading import Thread,current_thread
import time
def task():
time.sleep(4)
print(f" 子线程 {current_thread().name} 结束 ")
def task2():
print(f" 子线程 {current_thread().name} 结束 ")
if __name__ == '__main__':
p = Thread(target=task)
p2 = Thread(target=task2)
p.setDaemon(True) # 设置守护线程, 需在 start() 之前设置
p.start()
p2.start()
print(" 主线程 --->")
print(p.is_alive())
''' 输出
子线程 Thread- 2 结束主线程 --->
True
'''
🔰主线程和非守护线程都已经结束了, 于是把还未运行完的守护进程带走了
九. 线程同步锁(互斥锁 / 排他锁)
与进程概念同步锁相同, 主要用于解决多个人同时操作同一份文件造成的数据安全性问题, 我们还是来举个例子 :
- 访问一个数据, 先拿到数据访问计数, 访问一次修改计数加 1, 再返回修改后的数据
from threading import Thread
import time
box = 0
def children():
global box
temp = box # 拿到数据计数
time.sleep(1) # 模拟网络延迟
temp += 1 # 访问计数加一
box = temp # 返回改后数据
if __name__ == '__main__':
li = []
for i in range(100): # 100 个用户
p = Thread(target=children)
p.start()
li.append(p)
for i in li:
i.join()
print(box) # 1
得到的结果是 1? 不应该是 100 吗? 这就是数据的安全性问题, 我们可以通过加锁, 让同一时间只能有一个人进行数据的修改
from threading import Thread,Lock
import time
mutex = Lock()
box = 0
def children():
mutex.acquire() # 加锁
global box
temp = box # 拿到数据计数
time.sleep(0.1) # 模拟网络延迟
temp += 1 # 访问计数加一
box = temp # 返回改后数据
mutex.release() # 解锁
if __name__ == '__main__':
li = []
for i in range(100): # 100 个用户
p = Thread(target=children)
p.start()
li.append(p)
for i in li:
i.join()
print(box) # 100
ps : 加锁的另一种写法(通过上下文管理器)
mutex = Lock()
def children():
with mutex:
global box
temp = box # 拿到数据计数
time.sleep(0.1) # 模拟网络延迟
temp += 1 # 访问计数加一
box = temp # 返回改后数据
十. 信号量 Semaphore (了解)
线程信号量与进程信号量一模一样, 概念不在赘述, 下面举个例子 :
- 一个小网吧, 最多容纳 3 个网瘾少年, 可以同时有十个少年上机玩游戏, 其他人只能等着, 有些人完的时间短下机了, 那么就空出了一个位置(锁), 外边等的人就可以抢这个位置了
from threading import Thread,Semaphore,current_thread
import time,random
sm = Semaphore(3)
def young():
with sm:
print(f" 少年 {current_thread().name} 正在打电脑 ")
time.sleep(random.randint(1,3))
print(f" 少年 {current_thread().name} 下机了 ")
if __name__ == '__main__':
li = []
for i in range(7):
p = Thread(target=young)
p.start()
li.append(p)
for i in li:
i.join()
print(" 网吧老板被抓 ")
''' 输出
少年 Thread- 1 正在打电脑
少年 Thread- 2 正在打电脑
少年 Thread- 3 正在打电脑
少年 Thread- 2 下机了
少年 Thread- 4 正在打电脑
少年 Thread- 4 下机了
少年 Thread- 5 正在打电脑
少年 Thread- 3 下机了
少年 Thread- 1 下机了
少年 Thread- 6 正在打电脑
少年 Thread- 7 正在打电脑
少年 Thread- 5 下机了
少年 Thread- 6 下机了
少年 Thread- 7 下机了
网吧老板被抓
Process finished with exit code 0
'''
十一. 死锁与递归锁
1. 什么是死锁
所谓死锁 : 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程
- 老板与员工交易示例, 揭秘公司两个财务为何大打出手(剧情为便于理解纯属虚构)
#🔹老板向员工发工资, 员工向老板还钱, 财务负责处理金额问题, 处理财务必须获得个人令牌,
#🔹老板 ---> 员工 : 财务 1 先获得了老板令牌, 再获得员工两人的令牌, 进行第一次金额交易后归还了各自令牌
#🔹员工 ---> 老板 : 紧接着财务 1 看看账目, 发现员工要向老板还钱, 于是先获取员工的令牌, 于此同时财务 2 在另外一本账目上处理一笔老板向员工的转账
#🔹于是乎财务 2 将老板的令牌拿走了, 财务 1 没拿到, 而财务 2 想拿员工的令牌, 发现令牌在财务 1 手上, 于是两人争执不过就大打出手了
from threading import Thread,Lock
import time
boss = Lock()
staff = Lock()
class Mythread(Thread):
def __init__(self,name):
super().__init__()
self.name = name
def deal1(self):
boss.acquire() # 加老板锁
print(f"{self.name}获取了老板令牌 ")
staff.acquire() # 加员工锁
print(f"{self.name}获取了员工令牌 ")
print(f"{self.name}操控了交易 ")
staff.release() # 释放老板锁
boss.release() # 释放员工锁
def deal2(self):
staff.acquire() # 加员工锁
print(f"{self.name}获得了员工令牌 ")
print(" 正在去老板办公室...")
time.sleep(1)
boss.acquire() # 加老板锁
print(f"{self.name}获得了老板令牌 ")
print(f" 没内鬼,{self.name}继续交易 ")
boss.release() # 释放老板锁
staff.release() # 释放员工锁
def run(self):
self.deal1()
self.deal2()
if __name__ == '__main__':
p1 = Mythread(" 财务 1 ")
p2 = Mythread(" 财务 2 ")
p1.start()
p2.start()
''' 输出
财务 1 获取了老板令牌
财务 1 获取了员工令牌
财务 1 操控了交易
财务 1 获得了员工令牌
正在去老板办公室...
财务 2 获取了老板令牌
'''
🔰出现死锁, 程序就此停在这无法再运行下去
解决方法使用递归锁
2. 什么是递归锁
递归锁, 在 Python 中为了支持在同一线程中多次请求同一资源, python提供了可重入锁 RLock, 这个RLock 内部维护着一个 Lock 和一个 counte r 变量, counter 记录了 acquire 的次数,从而使得资源可以被多次 require, require 一次技术就加 1, 直到一个线程所有的acquire 都被release,其他的线程才能获得资源
- 使用递归锁解决财务纠纷问题
from threading import Thread,Lock,RLock
import time
boss = staff = RLock() # 让老板锁和员工锁变成了一把锁
class Mythread(Thread):
def __init__(self,name):
super().__init__()
self.name = name
def deal1(self):
boss.acquire() # 锁计数 +1 = 1
print(f"{self.name}获取了老板令牌 ")
staff.acquire() # 锁计数 +1 = 2
print(f"{self.name}获取了员工令牌 ")
print(f" 有内鬼,{self.name}继续交易 ")
staff.release() # 锁计数 -1 = 1
boss.release() # 锁计数 -1 = 0
def deal2(self):
staff.acquire() # 锁计数 +1 = 1
print(f"{self.name}获得了员工令牌 ")
print(" 正在去老板办公室...")
time.sleep(1)
boss.acquire() # 锁计数 +1 = 2
print(f"{self.name}获得了老板令牌 ")
print(f" 没内鬼,{self.name}继续交易 ")
boss.release() # 锁计数 -1 = 1
staff.release() # 锁计数 -1 = 0
def run(self):
self.deal1()
self.deal2()
if __name__ == '__main__':
p1 = Mythread(" 财务 1 ")
p2 = Mythread(" 财务 2 ")
p1.start()
p2.start()
''' 输出
财务 1 获取了老板令牌
财务 1 获取了员工令牌
有内鬼, 财务 1 继续交易
财务 1 获得了员工令牌
正在去老板办公室...
财务 1 获得了老板令牌
没内鬼, 财务 1 继续交易
财务 2 获取了老板令牌
财务 2 获取了员工令牌
有内鬼, 财务 2 继续交易
财务 2 获得了员工令牌
正在去老板办公室...
财务 2 获得了老板令牌
没内鬼, 财务 2 继续交易
Process finished with exit code 0
'''
🔰正常完成任务
十二. 事件 (Event)(了解)
线程之间状态同步, 两个不同的任务执行, 一个任务如果需要另一个任务执行之后才能开始执行, 那么这个待执行的任务是如何获取到上一个任务执行状态的呢? 如果是进程, 那就需要借助共享内存传递一个标志信号, 而线程本身就共享一个线程的内存空间, 所以, 为了解决以上问题, threading 模块为我们提供了一个 Event 对象
1.Event 对象的方法
Event 本质就是一个标志, True或者 False, 而它提供的wait 函数可以阻塞当前线程, 直到状态从 False 变为True
- 导入方法 :
from threading import Event
方法 | 作用 |
---|---|
event.isSet() | 返回 event 的状态 |
event.wait() | 状态值为 False 为阻塞, 默认 False |
event.set() | 设置 event 的状态值为 True,所有阻塞池的线程激活进入就绪状态,等待操作系统调度 |
event.clear() | 恢复 event 的状态值为 False |
2. 红绿灯示例
- 红灯停(False), 绿灯行(True)
from threading import Event,Thread,current_thread
import time
import random
e = Event() # 默认 False
def f1(): # 红绿灯机制
while 1:
e.clear() # 先将其置为 False
print(" 红灯亮 ")
time.sleep(3) # 模拟红灯等待三秒
e.set() # 将其设置为 True
print(" 绿灯亮 ")
time.sleep(2) # 模拟绿灯通行两秒
def f2(): # 模拟行人
while 1:
if e.is_set(): # 判断如果是 True, 则通行
print(f"{current_thread().name}行人正在通行 ")
break # 行人走后结束这个线程
else:
print(f"{current_thread().name}正在等待 ")
e.wait() # 正在阻塞状态, 当 event 变为 True 时就激活
if __name__ == '__main__':
Thread(target=f1).start() # 创建一个红绿灯线程
while 1:
time.sleep(random.randint(0,2))
Thread(target=f2).start() # 随机时间产生一个行人
''' 输出
红灯亮
Thread- 2 正在等待
Thread- 3 正在等待
Thread- 4 正在等待
绿灯亮
Thread- 2 行人正在通行
Thread- 4 行人正在通行
Thread- 3 行人正在通行
Thread- 5 行人正在通行
红灯亮
Thread- 6 正在等待
绿灯亮
Thread- 6 行人正在通行
......
....
```
十三. 定时器 (Timer)
指定 n 秒后执行操作
- 用法 : Timer(float,function,args=(,))
- 参数 : 指定时间、函数名、函数参数
from threading import Timer
import time
def test(name):
print(f"{name}黑猫警长, 用时:{time.time()-start_time}")
start_time = time.time()
p = Timer(3.5,test,args=(" 葫芦娃 ",))
p.start()
''' 输出
葫芦娃黑猫警长, 用时:3.5001583099365234
'''
十四. 线程 Queue
队列主要用于进程与进程之间的通信, 因为它们内存空间不共享, 线程之间数据是共享的, 使用队列是因为队列的实现原理是管道 + 锁, 它能保证数据的安全性
ps : 目前我们使用的队列都只能单机测试使用, 以后我们使用的是基于网络的别人封装好了的队列框架
用法 : import queue
, 使用方式与进程 queue 无异, 以下介绍 queue 模块下三种类的使用
1. 先进先出 (FIFO) 队列 Queue
import queue
q = queue.Queue(3)
q.put("shawn")
q.put(123)
q.put([1,2,3,4])
try:
q.put({"name":"shawn"},block=False)
except Exception:
print(" 放满了 --->")
print(q.get())
print(q.get())
print(q.get())
try:
print(q.get())
except Exception:
print(" 取完了 --->")
''' 输出
放满了
shawn
123
[1, 2, 3, 4]
取完了
'''
2. 后进先出 (LIFO) 堆栈 LifoQueue
import queue
q = queue.LifoQueue(3)
q.put("shawn")
q.put(123)
q.put([1,2,3,4])
print(q.get())
print(q.get())
print(q.get())
''' 输出
[1, 2, 3, 4]
123
shawn
'''
3. 优先级队列 PriorityQueue
- 用法 :
put(([等级],[数据]))
- 等级小的优先级越高
import queue
q = queue.PriorityQueue(3)
q.put((45,"shawn"))
q.put((10,123))
q.put((-20,[1,2,3,4]))
print(q.get())
print(q.get())
print(q.get())
''' 输出
(-20, [1, 2, 3, 4])
(10, 123)
(45, 'shawn')
'''
十五.GIL 锁(重点)
1.GIL 的定义
Global Interpreter Lock 直译为 全局解释器锁
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL
exists, other features have grown to depend on the guarantees that it enforces.)
结论 : 在 Cpython 解释器中, 同一个进程下开启的多线程, 同一时刻只能有一个线程执行, 无法利用多核优势
2.GIL 不是 Python 语言的特性
GIL 是 Python 解释器 (Cpython) 时引入的概念,在 JPython、PyPy、Psyco 中没有 GIL。GIL 并不是 Python 的语言缺陷, Python 完全可以不依赖与 GIL
3. 为什么会存在 GIL
GIL 本质就是一把互斥锁, 所有的互斥锁的本质都一样, 将并发运行变成串行, CPython 在执行多线程的时候 并不是线程安全 的,所以为了程序的稳定性,加一把全局解释锁,能够确保任何时候都只有一个 Python 线程执行,
4. 为什么说并不是线程安全的
我们知道执行一个 Python 文件, 会将 Python 代码交给解释器运行, 调用的是解释器的功能, 每执行一个文件, 都会产生一个独立的进程, 该进程内不仅仅只有运行 Python 代码的进程, 还有解释器开启的垃圾回收等解释器级别的进程, 所有线程都运行在这同一个进程之内
产生的问题就是 : 假设你定义一个变量 x = 10
, 先申请一个内存空间, 将 10 放进内存空间中, 当你转身想让变量名去绑定 10 的内存地址的时候, 垃圾回收机制刚好启动起来了, 扫描到 10 的引用计数为 0, 于是就被清理掉了
转个身就没了??
解决这个问题的方法就是加锁, 保证 python 解释器同一时间只能执行一个任务的代码
原理 : 想要运行 Python 代码, 那就必须先拿到解释器锁, 相当于是拿到了 CPU 的使用权限, 然后在运行 Python 代码, 在 Python 代码中可以加自己的锁, GIL 保护的是解释器级别的数据(内存管理数据等)
值得肯定的一点是 : 保护不同的数据安全, 就应该加上不同的锁
5.GIL 与 Lock 的区别
我们以一个百人改值的示例来解释两者的不同
from threading import Thread,Lock
import time
mutex = Lock()
count = 100
def change():
global count
mutex.acquire() # 加锁
temp = count
time.sleep(0.1) # 模拟 I /O
temp -= 1 # 修改 -1
count = temp
mutex.release() # 释放锁
if __name__ == '__main__':
li = []
for i in range(100):
p = Thread(target=change)
p.start()
li.append(p)
for i in li:
i.join()
print(count) # 0
原理分析 : 同时开启了 100 个线程, 都去抢 GIL 锁, 假设线程 1 抢到了, 那么线程 1 就获取了 CPU 的执行权限, 将会被分配 CPU, 而其他的线程只能等待, 接着线程 1 运行 Python 代码, 先是加了一把互斥锁、赋值了一下变量, 然后 sleep(I/O), 这时操作系统发现你正在做 I /O, 立马把 CPU 资源调走并强行让其释放 GIL, 让别的线程抢夺, 假设线程 2 抢到了, 它就获得了 CPU 执行权限, 他也运行 Python 代码, 紧接着声明全局变量 count, 下一步发现被加锁了, 阻塞在原地, 于是被操作系统查觉到了, 立马又把 CPU 拿走并强行释放 GIL 给别的线程争抢, 但都像线程 2 一样抢到了也没啥用(相当于是降低了效率). 等到线程 1 的 I / O 做完了, 于是也参与抢锁, 操作系统会让它很快的抢到锁, 抢到之后接着运行上次运行的代码, 将 count 值减 1、释放 Lock、释放 GIL. 于是其他线程又开始了争夺, 直到所有的线程都运行完毕
6.GIL 对多线程产生的影响
有了 GIL 的存在,同一时刻同一进程中只有一个线程被执行, 但并不是说多核优势就没用了, 这需要看情况而定 : 对于一个程序来说, 要么就是计算密集型, 要么就是 I / O 密集型
- GIL 对 计算密集型程序 会产生影响 : 因为计算密集型的程序, 需要占用系统资源, CPU 越多越好, 效率总会提升, 但 GIL 的存在,相当于始终在进行单线程运算,这样自然就慢了
- 而对于 I/ O 密集型程序 来说, 程序大部分时间在等待, 所以它们是多个一起等 (多线程) 还是单个等 (单线程) 都得等
7. 并发任务的处理方案
现有四个任务, 要让其并发, 有哪些方案可选 :
- 方案一 : 开启四个单线程的进程
- 方案二 : 开启四个线程的单进程
- 方案三 : 开启两个进程, 每个进程两个线程, 或者一个进程三个线程, 一个进程单线程(混着来)
🍑单核情况下
四个任务都是计算密集型, 开启进程消耗大, 使用方案二
四个任务都是 I / O 密集型, 开启进程消耗大, 并且进程的切换速度远不如线程,使用方案二
🍑多核情况下
四个任务都是计算密集型, 多核意味着并行计算, 在 Cpython 中一个进程中同一时刻只有一个线程执行, 没有多核优势, 使用方案一
四个任务是都 I / O 密集型, 多核解决不了 I / O 问题, 该等还得等, 开进程消耗大, 使用方案二
8. 多进程与多线程对不同应用场景的性能对比
- 运算密集型对比
from multiprocessing import Process
from threading import Thread
import time,os
🍑多进程测试
def calculate():
count = 0
while count < 30000000:
count +=1
if __name__ == '__main__':
li = []
print(os.cpu_count()) # 查看 CPU 个数 --->4
start_time = time.time()
for i in range(4):
p = Process(target=calculate)
p.start()
li.append(p)
for i in li:
i.join()
print(f" 多进程用时:{time.time()-start_time}")
#🔰多进程用时:5.714418649673462
🍑多线程测试
def calculate():
count = 0
while count < 30000000:
count +=1
if __name__ == '__main__':
li = []
print(os.cpu_count()) # 查看 CPU 个数 --->4
start_time = time.time()
for i in range(4):
p = Thread(target=calculate)
p.start()
li.append(p)
for i in li:
i.join()
print(f" 多线程用时:{time.time()-start_time}")
#🔰多线程用时:9.828791618347168
- I/ O 密集型对比
from multiprocessing import Process
from threading import Thread
import time,os
🍑多进程测试
def calculate():
time.sleep(2)
if __name__ == '__main__':
li = []
print(os.cpu_count()) # 查看 CPU 个数 --->4
start_time = time.time()
for i in range(500):
p = Process(target=calculate)
p.start()
li.append(p)
for i in li:
i.join()
print(f" 多进程用时:{time.time()-start_time}")
#🔰多进程用时:40.83470439910889
🍑多线程测试
def calculate():
time.sleep(2)
print(" 完毕 --->")
if __name__ == '__main__':
li = []
print(os.cpu_count()) # 查看 CPU 个数 --->4
start_time = time.time()
for i in range(500):
p = Thread(target=calculate)
p.start()
li.append(p)
for i in li:
i.join()
print(f" 多线程用时:{time.time()-start_time}")
#🔰多线程用时:2.056976556777954
以上实验结论 : Cpython 对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于 I/O 密集型的任务效率还是有显著提升的
- 应用场景
多线程 主要运用于 I / O 密集型 : socket, 爬虫, web
多进程 主要运用于计算密集型 : 金融分析, 比特币挖矿
十六. 线程池概念
1. 什么是线程池
与进程池类似, 线程池是在系统启动时就先创建大量空闲的线程, 程序提交一个任务给线程池, 线程池便会调用一个线程来执行该任务, 当任务运行完毕后, 该线程并不会关闭, 而是返回到线程池中再次变为空闲状态等待下一个提交的任务,
2. 为什么使用线程池
虽说线程的启动相比较于进程开销非常小, 但毕竟也是需要向操作系统发起调用, 我们使用线程在一些情况下能更好的提升性能, 尤其是程序中有大量生命期短暂的线程时, 使用线程池最为合适了
3. 线程池的作用
使用线程池可以精确控制操作系统中并发线程的数量, 如果操作系统中有大量的并发线程, 并且没有限制数量, 那么就会导致操作系统的性能急剧下降, 甚至导致程序的崩溃, 而线程池可以通过控制最大线程数来解决该问题
十七. 线程池的使用 (concurrent.futures 模块)
1.concurrent.futures 模块
concurrent.futures模块是 Python 标准库模块, 它提供了 Executor 类, 而 Executor又提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池, 也可以把这两个子类看作 高度封装的异步调用接口
2. 常用方法
方法 | 说明 |
---|---|
submit(fn, args, *kwargs) | 将任务 fn 提交给进程池(异步提交), 后面的是参数 |
map(func, *iterables, timeout=None, chunksize=1) | 类似于 map 函数, 升级版本开启多进程以异步的方式来对可迭代对象进行 map 处理, 也类似于取代 for 循环 submit 的操作 |
shutdown(wait=True) | 关闭线程池, (下面进行详解) |
result(timeout=None) | 获得该线程任务的结果, 如果任务未执行完会进行阻塞, 参数为超时时间 |
add_done_callback(fn) | 为该线程设置回调, 当该线程完成任务时, 程序会自动触发 fn 函数 |
shutdown(wait=True)
详解:
相当于进程池的 pool.close() + pool.join() 操作
wait=True, 等待池内所有任务执行完毕回收完资源后才继续
wait=False, 立即返回, 并不会等待池内的任务执行完毕
但不管 wait 参数为何值,整个程序都会等到所有任务执行完毕
submit和 map 必须在 shutdown 之前
注意 : 在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列, 调用 shutdown() 方法后的线程池不再接收新任务, 但会将以前所有的已提交任务执行完成, 当线程池中的所有任务都执行完成后, 该线程池中的所有线程都会死亡
3. 使用 ThreadPoolExecutor 线程池启动线程任务步骤
- 使用 ThreadPoolExecutor 类创建一个线程池对象
- 创建一个任务task (普通函数)
- 调用线程池对象的 submit() 方法来提交任务
- 当没有任务的时候调用 shutdown() 方法来关闭进程池
4.ThreadPoolExecutor 使用示例
ps :ProcessPoolExecutor 与 ThreadPoolExecutor 的使用方法一模一样
from concurrent.futures import ThreadPoolExecutor
import os,time,random
def task(n):
print(f" 子线程:{os.getpid()}正在执行 ")
time.sleep(random.randint(1,3)) # 模拟任务执行时间
return n**2
if __name__ == '__main__':
thread_pool = ThreadPoolExecutor(max_workers=4) # 设置线程池大小
futures = []
for i in range(1,10):
future = thread_pool.submit(task,i) # 开启十个任务
futures.append(future)
thread_pool.shutdown(True) # 关闭线程池, 并等待任务结束
for future in futures:
print(future.result()) # 循环取出任务运行的结果(等到左右的任务执行完后才拿到)
''' 输出
子线程:1360 正在执行
子线程:1360 正在执行
子线程:1360 正在执行
子线程:1360 正在执行
子线程:1360 正在执行
子线程:1360 正在执行
子线程:1360 正在执行
子线程:1360 正在执行
子线程:1360 正在执行
1
4
9
16
25
36
49
64
81
'''
5. 回调函数的使用 (add_done_callback())
submit()
提交任务 future, 当future 调用resule()
方法, 会阻塞当前主线程, 等到所有线程完成任务后, 该阻塞才会解除, 于是拿到的结果入上面的示例一样, 先运行完十个任务, 再拿到十个结果- 如果不想让
resule()
方法将线程阻塞, 那么就可以使用 future 的add_done_callback()
来添加回调函数, 当线程任务结束后, 程序会自动触发该回调函数, 并将 future 对象结果作为参数传给回调函数, 那我们可以在其内直接打印结果
from concurrent.futures import ThreadPoolExecutor
import os,time,random
def task(n):
print(f" 子线程:{os.getpid()}正在执行 ")
time.sleep(random.randint(1,3))
return n**2
def resule_back(res):
print(res.result()) # 打印任务运行的结果(不需要等待其他线程任务完成)
if __name__ == '__main__':
thread_pool = ThreadPoolExecutor(max_workers=4)
for i in range(1,10):
future = thread_pool.submit(task,i)
future.add_done_callback(resule_back) # 设置回调函数
thread_pool.shutdown(True) # 关闭线程池
''' 输出
子线程:17164 正在执行
子线程:17164 正在执行
子线程:17164 正在执行
子线程:17164 正在执行
4
子线程:17164 正在执行
1
子线程:17164 正在执行
9
1625
子线程:17164 正在执行
子线程:17164 正在执行
36
子线程:17164 正在执行
8164
49
'''
6.map() 方法的使用
map()
方法第一个参数是函数, 第二个是可迭代对象, 第三个是超时时间- 这种方法相当于是开启
len(iterable)
个线程, 等于是替代了 for + submit() 这两步 map()
方法得到的结果是一个生成器 (generator) 对象, 可以使用list()
函数造成列表- 优点 : 代码简单, 最后收集任务运行的结果, 仍然与传入参数的顺序一样
from concurrent.futures import ThreadPoolExecutor
import os,time,random
def task(n):
print(f" 子线程:{os.getpid()}正在执行 ")
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
thread_pool = ThreadPoolExecutor(max_workers=4)
# 相当于替代了 for+submit(), 返回的是一个可 list 的对象
future = thread_pool.map(task,range(1,10))
thread_pool.shutdown(True) # 关闭线程池
print(future)
print(list(future)) # [1, 4, 9, 16, 25, 36, 49, 64, 81]
''' 输出
子线程:1096 正在执行
子线程:1096 正在执行
子线程:1096 正在执行
子线程:1096 正在执行
子线程:1096 正在执行
子线程:1096 正在执行
子线程:1096 正在执行
子线程:1096 正在执行
子线程:1096 正在执行
<generator object Executor.map.<locals>.result_iterator at 0x000001EDF4C72748>
[1, 4, 9, 16, 25, 36, 49, 64, 81]
'''
7. 使用回调函数爬取各网站大小示例
from concurrent.futures import ThreadPoolExecutor
import requests,os
def get_htm(url):
print(f" 线程:{os.getpid()}正在获取网站:{url}源码 ")
response = requests.get(url)
if response.status_code == 200:
return {"url":url,"text":response.text}
else:
return {"url":url,"text":""}
def parse_htm(back):
res = back.result()
print(f" 线程:{os.getpid()}正在解析网站:{url}源码 ")
with open("html_size.txt","a")as f:
f.write(f"url:{res['url']},size:{len(res['text'])}\n")
if __name__ == '__main__':
urls=[
'https://zhuanlan.zhihu.com',
'https://www.cnblogs.com',
'https://www.python.org',
'https://blog.csdn.net',
'http://www.china.com.cn',
]
li = []
thread_pool = ThreadPoolExecutor(3)
for url in urls:
future = thread_pool.submit(get_htm,url)
future.add_done_callback(parse_htm)
li.append(future)
thread_pool.shutdown(True)
''' 输出
线程:6392 正在获取网站:https://zhuanlan.zhihu.com 源码
线程:6392 正在获取网站:https://www.cnblogs.com 源码
线程:6392 正在获取网站:https://www.python.org 源码
线程:6392 正在解析网站:http://www.china.com.cn 源码
线程:6392 正在获取网站:https://blog.csdn.net 源码
线程:6392 正在解析网站:http://www.china.com.cn 源码
线程:6392 正在获取网站:http://www.china.com.cn 源码
线程:6392 正在解析网站:http://www.china.com.cn 源码
线程:6392 正在解析网站:http://www.china.com.cn 源码
线程:6392 正在解析网站:http://www.china.com.cn 源码
'''
- 查看下文件